package com.tca.common.learning.webflux.reactor.source;

import com.google.common.collect.Lists;
import com.tca.common.core.utils.ValidateUtils;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;

import java.util.List;

/**
 * @author zhoua
 * @date 2022/1/8 14:59
 */
public class MyListFlux<T> extends Flux<T> {

    private List<T> source;

    public MyListFlux(List<T> source) {
        this.source = source;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        coreSubscriber.onSubscribe(new MyListSubscription<T>(source, coreSubscriber));
    }

    static class MyListSubscription<T> implements Subscription {

        private List<T> source;

        private Subscriber<? super T> subscriber;

        private int index;

        private boolean canceled;

        public MyListSubscription(List<T> source, Subscriber<? super T> subscriber) {
            this.source = source;
            this.subscriber = subscriber;
        }

        @Override
        public void request(long l) {
            if (this.canceled || ValidateUtils.isEmpty(source)) {
                return;
            }
            for (long i = 0; i < l && index < source.size(); i++) {
                subscriber.onNext(source.get(index++));
            }
            if (index == source.size()) {
                subscriber.onComplete();
            }
        }

        @Override
        public void cancel() {
            this.canceled = true;
        }
    }

    public static void main(String[] args) {
        new MyListFlux<String>(Lists.newArrayList("A", "B"))
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        System.out.println("onSubscribe");
                        subscription.request(10);
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                        System.out.println("=====");
                    }

                    @Override
                    public void onError(Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
    }
}
